package com.english.websocket.dealer;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.english.websocket.constants.ProjectConstants;
import com.english.websocket.pojo.RequestParamPojo;
import com.english.websocket.processor.MediaTypeProcessor;
import com.english.websocket.queue.QueueManage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.*;

public class CommonRequestDataDealer implements RequestDataDealer {
    private static final Logger logger = LoggerFactory.getLogger(CommonRequestDataDealer.class);
    private static String resourcePath;
    private Boolean multiMode = false;
    private Integer timeInterval = 40;

    static {
        try {
            resourcePath = CommonRequestDataDealer.class.getResource("/").toURI().getPath();
            if (resourcePath != null) {
                resourcePath = resourcePath.replaceAll("target/classes", "src/main/resources");
            }
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void collectData(JSONObject originalData, Map<String, String> requestPath, String appId) throws Exception {
        Map<String, List<RequestParamPojo>> requestParamPojoMap = this.prepareData(originalData, requestPath);

        Collection<List<RequestParamPojo>> requestParamPojoListCollection = requestParamPojoMap.values();
        List<Integer> requestParamPojoItemCountList = new ArrayList<>();
        for (List<RequestParamPojo> requestParamPojoList : requestParamPojoListCollection) {
            requestParamPojoItemCountList.add(requestParamPojoList.size());
        }
        Integer maxCount = Collections.max(requestParamPojoItemCountList);

        long syncTimestamp = System.currentTimeMillis();
        //发送次数
        for (int i = 0; i < maxCount; i++) {
            JSONObject requestJsonObject = JSONObject.parseObject(originalData.toJSONString());
            List<Integer> headerStatus = new ArrayList<>();
            for (Map.Entry<String, List<RequestParamPojo>> entry : requestParamPojoMap.entrySet()) {
                String jsonPath = entry.getKey();
                String jsonPathPrefix = jsonPath.substring(0, jsonPath.lastIndexOf("."));
                List<RequestParamPojo> value = entry.getValue();

                int size = value.size() - 1;
                if (i <= size) {//存在数据段发送，不存在则移除
                    RequestParamPojo requestParamPojo = value.get(i);
                    headerStatus.add(requestParamPojo.getStatus());

                    JSONPath.set(requestJsonObject, jsonPath, Base64.getEncoder().encodeToString(requestParamPojo.getByteContent()));
                    JSONPath.set(requestJsonObject, jsonPathPrefix + ".status", requestParamPojo.getStatus());

                    String mediaType = jsonPath.substring(jsonPath.lastIndexOf(".")+1);
                    JSONPath.set(requestJsonObject, jsonPathPrefix + ".seq", i);
                    if (mediaType.equals(ProjectConstants.AUDIO) || mediaType.equals(ProjectConstants.VIDEO)) {
                        Object timestamp = JSONPath.eval(requestJsonObject, jsonPathPrefix + ".timestamp");
                        if (multiMode || timestamp != null) {
                            syncTimestamp = syncTimestamp + timeInterval;
                            JSONPath.set(requestJsonObject, jsonPathPrefix + ".timestamp", Long.toString(syncTimestamp));
                        }
                    }
                } else {
                    //发送完毕后进行单通道数据剔除
                    JSONPath.remove(requestJsonObject, jsonPathPrefix);
                }
            }
            Integer minHeaderStatus = Collections.min(headerStatus);
            JSONPath.set(requestJsonObject, "$.header.app_id", appId);
            JSONPath.set(requestJsonObject, "$.header.status", minHeaderStatus);
            logger.info("produce reqData to queue:{}", requestJsonObject.toString());

            QueueManage.getInstance().productRequestParamData(requestJsonObject);
            Thread.sleep(timeInterval);
        }
    }

    private Map<String, List<RequestParamPojo>> prepareData(JSONObject originalData, Map<String, String> requestPath) throws IOException {
        Map<String, List<RequestParamPojo>> requestParamPojoMap = new HashMap<>();
        List<String> mediaTypeList = new ArrayList<>();

        for (Map.Entry<String, String> entry : requestPath.entrySet()) {
            String jsonPath = entry.getKey();
            String mediaType = jsonPath.substring(jsonPath.lastIndexOf(".") + 1);
            mediaTypeList.add(mediaType);
            String filePath = entry.getValue();

            File file = new File(resourcePath + filePath);
            if (!file.exists()) {
                throw new RuntimeException("资源文件不存在");
            }

            String statusJsonPath = jsonPath.substring(0, jsonPath.lastIndexOf(".")) + ".status";
            Object statusJsonObject = JSONPath.eval(originalData, statusJsonPath);

            boolean isStreamInput = true;
            if (statusJsonObject != null) {
                Integer status = ((Integer) statusJsonObject);
                if (status == ProjectConstants.STATUS_END) {
                    isStreamInput = false;
                }
            }

            if (isStreamInput) {
                requestParamPojoMap.put(jsonPath, MediaTypeProcessor.getInstance().dealMediaData(file, originalData, jsonPath));
            } else {
                requestParamPojoMap.put(jsonPath, MediaTypeProcessor.getInstance().buildStreamData(file, null, false));
            }
        }

        if (mediaTypeList.contains(ProjectConstants.AUDIO) && mediaTypeList.contains(ProjectConstants.VIDEO)) {
            this.multiMode = true;
        }
        return requestParamPojoMap;
    }
}
